查看原文
其他

一学就会:Kotlin Flow 实战举例

AndroidPub 2022-09-20

作者:搬砖小子出现了
原文:https://juejin.cn/post/7088622810196607006

1. 引言

Flow 是 Kotlin 官方基于协程构建的用于响应式编程的API。响应式编程简单来说就是使用异步数据流进行编程 。协程中,使用挂起函数仅可以异步返回单个值,而 Flow 则可以异步返回多个值,并补全kotlin语言中响应式编程的空白。

比如压缩图片需要执行多个异步任务,完成一个通知一下,不借助 Kotlin Flow,我们可能会使用线程池 + 回调的方式执行 :

Iterator<InputStreamProvider> iterator = mStreamProviders.iterator();

while (iterator.hasNext()) {
  final InputStreamProvider path = iterator.next();

  AsyncTask.SERIAL_EXECUTOR.execute(new Runnable() {
    @Override
    public void run() {
      try {
        File result = compress(context, path);
        mHandler.sendMessage(...);
      } catch (IOException e) {
        mHandler.sendMessage(...);
      }
    }
  });

  iterator.remove();
}

// 使用:
LubanBuilder().load(path)
.setCompressListener(object : OnCompressListener {
        override fun onSuccess(file: File) {
          ...
        }
}).launch()

而如果你用 Kotlin Flow 一切都变得那么简单明了:

//构建
fun zipImages(paths:List<String>):Flow<Result<File>>{
 return
      paths.map{ path->
           flow {
              emit(compress(context, path))
           }.catch{ exception ->
              emit(Result.Error(exception))
           }
      }.merge().flowOn(Dispaters.IO)
}

//监听
launch{
    zipImages().collect{ result->
         when(result){
              is Result.Success ->{
              }
              
              is Result.Error ->{
              }
         }
          ...
    }
}

而如果,单纯使用挂起函数我们无法返回多个数值,例如我们将一个回调改造成挂起函数,

interface  SimpleInterface {
    fun onReceive(value: Int)
}

suspend  fun simpleSuspend()Int {
    return suspendCoroutine { coroutine ->
 val callback = object : SimpleInterface {
            override  fun onReceive(value: Int) {
                coroutine.resume(value)
            }
        }
        callback.onReceive(1)
        //再来一次 !
        callback.onReceive(2)
    }
}

如果,我们尝试resume多次,此时协程则会抛异常:

那么,Flow 仅仅是能返回多个值就值得如此力荐?当然不是,推荐它的原因更多是它丰富的操作符,用 Flow 能低成本的异步处理数据,下面让我们结合项目实例来看看它有哪些优势。

首先,我们要知道 Flow 分两种:

冷流 🥶热流 🥵
不消费,不生产,多次消费,多次生产,只有1个观察者有没有消费者都会生产数据

2. 冷流

2.1 流的构建

各种冷流的构建姿势

flowOf(1,2,3)

list(1,2,3).asFlow()

flow {
    emit(1)
}


//回调改造使用callbackFlow
callbackFlow {
   send(value)
   awaitClose { }
}

//在一般的flow在构造代码块中不允许切换线程,ChannelFlow则允许内部切换线程
channelFlow{
    send("hello")
    
    withContext(Dispatchers.IO) {
    send("channel flow")
    }
}

2.2 流的监听

官方提供了很多触发流执行的操作符,这种都是在调用链的末尾处,所以一般也称之为末端操作符:

//构建 
val simpleFlow = flow {
                      emit(1)
                      emit(2)
                 }


//使用 ,注意 collect 是个挂起函数,collect 后面如果有代码 不会立即执行
coroutineScope.launch{ 
    simpleFlow.collect{ value->
          println(value)
    }
}

//输出
1
2

推荐使用 onEach + launchIn 因为 collect 是挂起函数,后面如果有代码可能不被立即执行。

终端操作符包括 collectcollectIndexedcollectLatesttoListtoSetlastfirstlaunchIn 等等,更多操作符参考 《Kotlin Flow 操作符:篇幅很大 你忍一下

一般的 flow 是 “冷”的,即不消费则不生产,多次消费多次生产

顺带看下 官方提供的API 的简洁之处 :

2.3 流的取消

flow 是基于协程的,因此其生命周期是和 CoroutineScope 挂钩的。

fun simple(): Flow<Int> = flow { 
    for (i in 1..3) {
        delay(100)          
        println("Emitting $i")
        emit(i)
    }
}

fun main() = runBlocking<Unit> {
    withTimeoutOrNull(250) { // 在 250 毫秒后超时
        simple().collect { value -> println(value) } 
    }
    println("Done")
}

Emitting 1
1
Emitting 2
2
Done

通过 launchIn 操作符我们还能拿到 Job ,来自行控制 Flow 的取消:

val job = simple().onEach { value ->
 println(value)
} .launchIn(this)

launch {
delay(250)
    job.cancel()
}

//输出
Emitting 1
1
Emitting 2
2

一般来说我们不需要关心流的生命周期,在Android上我们通常会使用LifecycleScope 或者 ViewModelScope ,因此在页面关闭时这些Flow 都会被取消。

2.4 流的异常

为了保证流的透明,flow 构造内禁止构建 try catch ,可以使用catch操作符来捕获异常


flow {
  emit(1)
  throw Exception("test")
}.catch { e->
   ...
}

// 可以继续在catch 里 throw 移除
// 也可以调用 emit 将异常转化为值 发出去
// 也可以只打印日志 

2.5 流的背压

背压(Back Pressure),就是生产速率大于了消费速率。这个问题得益于 suspend 的魔力,flow 会将生产端挂起,同时我们也可以使用 buffer 操作符,将数据添加缓冲区,避免挂起。

listOf(1,2,3,4,5).asFlow().onEach {
        delay(100)
    }.buffer(capacity = 2, onBufferOverflow = BufferOverflow.SUSPEND)
    .collect {
        delay(500)
    } 
  • capacity : 缓冲区容量 默认 64
  • onBufferOverflow :超出缓冲区之后的策略 ,有 挂起,抛弃最新,抛弃最旧 三种策略

还有 conflatecollectLatest 等操作符可供使用,不过都是 buffer 的封装。

2.6 更多操作符

官方提供了其他大量简洁好用的操作符,这里结合实际例子来介绍部分操作符简化开发工作的实例:

  • 数据防抖
  • catch
  • retry 失败重试
  • 线程切换
  • 把上面的操作结合到一起,封装一下(copy from iosched)

之后我们简单网络请求就可以这样写了:

//定义  usecase
class PopupUseCase : FlowUseCase<Unit, GetPopupsData>(CommonIOPool) {

    private val service by requestService(PopupApiClient::class.java)

    override fun execute(parameters: Unit): Flow<Result<GetPopupsData>> {
        return service.getPopups().asFlowResult()
    }
}

//在ViewModel 中使用
private val popupUseCase = PopupUseCase()

popupUseCase(Unit).onSuccess { result ->
...
    }.onLoading{
       ...
    }.onFail{
       ...
    }. launchIn(viewModelScope) 

onSuccess,onLoading,onFail 是项目中自己的封装。

fun <T> Flow<Result<T>>.onSuccess(onSuccess: (T) -> Unit): Flow<Result<T>> {
    return this.onEach { result ->
        if (result is Result.Success) {
            onSuccess.invoke(result.data)
        }
    }
}

更多更全操作符请参阅:《Kotlin Flow 操作符:篇幅很大 你忍一下

3. 热流

介绍热流之前我们先看一下常用的 LiveData 有什么问题,因为这些问题也正好是热流的优势所在,LiveData 的优势是上手简单,但是同事存在不少缺点:

  • 不支持背压,快速postValue 只能收到最后一次的回调
  • 粘性事件,当配置变更时再次绑定会立即收到上次的值,如果用来处理事件就会有问题
  • 观察只能在主线程
  • 提供的 Transformations.map / switchMap 都是在主线程操作
  • 没有操作符来做复杂转换
  • 和 Android 组件绑定 ,不利于单元测试

前面介绍的冷流是单播,即一次消费对应一次生产。而实际开发中也有许多多播 + 热流的需求,LiveData 就属于多播+热流,Flow 的热流通过 SharedFlow 和 StateFlow 实现:

3.1 SharedFlow

val hotData = MutableSharedFlow<Int>(replay = 1
                                    extraBufferCapacity = 64 ,
                                    onBufferOverflow = BufferOverflow.DROP_OLDEST)

hotData.onEach{ value->
    println("1号观察者 观察到:$value")
}.launchIn(coroutineScope)

launch {
hotData.emit(1//emit 是个挂起函数
}

hotData.onEach{ value->
    println("2号观察者 观察到:$value")
}.launchIn(coroutineScope)

launch {
hotData.emit(2)
}

//输出
2号观察者 观察到:1
1号观察者 观察到:1
1号观察者 观察到:2
2号观察者 观察到:2

//如果 replay = 0 
1号观察者 观察到:1
1号观察者 观察到:2
2号观察者 观察到:2

上面说到如果我们用 LiveData 是“粘性事件”,新订阅者会理解收到之前的值,如我们使用LiveData 控制 Toast ,则会再次弹出。LiveData 会保证订阅者总能在值变化的时候观察到最新的值,并且每个初次订阅的观察者都会执行一次回调方法。这样的特性对于维持 UI 和数据的一致性没有任何问题,但想要观察 LiveData 来发射一次性的事件就超出了其能力范围。

此时,我们可以利用 SharedFlow 来处理一次性事件:

- 当 replay = 0 时(默认也为0 ),我们完全可以用SharedFlow来当做事件发送载体,不用担心被重放
- 需要注意 emit 与 tryEmit ,二者差别巨大,一般情况建议用 emit, 背后原理下期分析
- 项目实战: 点击ViewBinder中的卡片打开子页面

private val _openReviewFragmentEvent = MutableSharedFlow<Unit>()
val openReviewFragmentEvent = _openReviewFragmentEvent.asSharedFlow()

//观察事件
viewModel.openReviewFragmentEvent.onEach {
    toggleReviewFragment()
} .launchWhenResumed(lifecycleScope)

//发送事件
viewModel {
    _openReviewFragmentEvent.emit(Unit
}

即使手机配置变更,此处也不会再次回调,是用作事件发送的简单手段。如果你不想事件重复消费,可以使用 channel + flow 的方式处理。

项目实战1:数据缓存池

之前有个文字聊天室的需求,定时轮询拉取聊天消息,每次拉取 20条,缓存池 200 ,满了就丢掉旧数据,然后间隔 500ms 展示一条数据。当时写了很长的代码,现在使用 SharedFlow 可以轻松实现 ,甚至进行更多定制:

//定义消息池
val messagePool = MutableSharedFlow<Int>(replay = 0 , 
                                    extraBufferCapacity = 200 ,
                                    onBufferOverflow = BufferOverflow.DROP_OLDEST)
//发送数据
mesaagePool.emit(message)


//消费数据
mesaagePool.onEach{
  delay(500)
  ...
}.launchIn(coroutineScope)

SharedFlow 加上 LifecycleScope 你甚至可以用 SharedFlow 改造成 FlowEventBus : FlowEventBus,参考:《打造一个 Kotlin Flow 版的 EventBus

3.2 StateFlow

StateFlow 是 SharedFlow 的一种特殊实现,replay=1 , 无缓存配置,DROP_OLDEST。功能和定位与 LiveData 相似,相同点在于:

  • 允许多个观察者
  • 有只读和可变两种类型
  • replay = 1

但是和 LiveData 不同点在于 :

  • 必须配置初始值
  • value 空安全
  • Flow丰富的异步数据流操作
  • 默认数据防抖(连续相同的值不会回调)

例如我们使用 StateFlow 替代 LiveData 管理 ViewModel 中的状态:

// viewModel 中定义 flow
private val _pageState = MutableStateFlow<Result<Unit>>(Result.Loading)
val pageState: StateFlow<Result<Unit>> = _pageState.asStateFlow()


// 页面里注册观察
viewModel.pageState.onSuccess {
...
} .launchWhenResumed(lifecycleScope)


//viewModel 获取数据后设置值
repository.getResult(...).onStart  {
_pageState.value = Result.Loading
} .onSuccess  { result ->
_pageState.value = Result.Success(Unit)
} .onFail  { exception ->
_pageState.value = Result.Error(exception)
} .launchIn(viewModelScope)

使用起来和 LiveData差 不多,但结合Flow 丰富的操作符,就能解决更多问题了:

项目实战2:搜索框防抖

val _searchQuery = MutableStateFlow(EMPTY)

object : NormalTextWatcher() {
    override fun afterTextChanged(text: Editable?) {
        _searchQuery.value = text.toString()
    }


_searchQuery.filter { it.isNotEmpty() } // 过滤空内容,避免无效网络请求
            .debounce(300// 300ms防抖
            .flatMapLatest { searchFlow(it.toString()) } //执行搜索并且新搜索覆盖旧搜索
            .flowOn(Dispatchers.IO) // 让搜索在异步线程中执行
            .onEach { updateUi(it) } // 获取搜索结果并更新界面
            .launchIn(mainScope) // 在主线程收集搜索结果// 更新界面fun updateUi(it: List<String>) {}
复制代码

debounce : 指定时间内的值只接收最新的一个

SharedFlow 和 StateFlow 怎么选?

  • 在Android 开发中, StateFlow 效果和LiveData等同,用于UI 数据绑定即可
  • SharedFlow 功能更强大,按需使用,一般可以用作事件广播

4. Flow 的其他应用

项目实战3:回调改造

 fun uploadFiles(files: List<File>): Flow<UploadPicResult> {
    return callbackFlow  {
UploadImageWorker().upload(files.map { file-> UploadPicInfo(file.name, file.absolutePath) } ,
            object : IUploadPicListener {
            
                override fun onSingleUploadSuccess(result: UploadPicResult) {
                    trySendBlocking(result)
                }

                override fun onSingleUploadFailure(result: UploadPicResult?) {
                }
                
                override fun onUploadComplete() {
                     close()  //flow 发送结束,关闭通道
                }
            })
        awaitClose {
        //如果回调需要解注册,可以在这里操作
         }
}
}

项目实战4:ViewPager2

在 ViewPager2 中不可见的Fragment生命周期是 onPause ,对于 LiveData 而言 onPause 仍属于活跃状态,仍会收到事件回调。但是如果使用 Lifecycle ktx 里提供的 LaunchWhenX 系列 搭配 Flow 就没这个问题啦。

lifecycleScope.launchWhenResumed {
flow.collect { value ->
     println(value)
    }
}

//项目中已经封装了方法,也可以按以下方式调用,少点括号
flow.onEach{ value ->
    println(value)
}.launchWhenResumed(lifecycleScope)

因为 flow 的 collect 是个挂起函数,当被 pause时 就会被挂起,不会收到回调啦。但这个只是粗暴的挂起,我们可以使用Lifecycle-ktx 2.4.0 推出的API repeatOnLifecycle 来进行观察,这个方法会在对应的生命周期 进行重复执行 和 取消,这样可以减少资源的浪费。

lifecycleScope.launch {
 lifecycle.repeatOnLifecycle(Lifecycle.State.RESUMED) {
        flow.collect {

        }
    }
}
//每次都这么写也太麻烦了 ,官方为Flow封装了一个扩展方法
flow.flowWithLifecycle(lifecycle,Lifecycle.State.RESUMED)

一言蔽之 :launchWhenX 暂停协程的执行,repeatOnLifecycle 取消并重新启动新的协程

项目实战5:压缩上传图片

 draft.getImagesPath().map { path ->
    flow {
          //压缩文件
         emit(zipImage(draft.skuId, path))
    }
}.flatten().merge().flatMapMerge(6) { zipFile ->
    flow {
        //上传文件
         emit(uploadFiles(zipFile))
    }
}.catch { exception ->
    Logger.d(TAG, exception.toString())
}.retry(3).cancellable().flowOn(CommonIOPool)

5. 总结

  • LiveData 适用于简单的UI绑定场景。
  • Flow 提供了大量的操作符来简化我们的开发。
  • SharedFlow 和 StateFlow 前者用于处理 Event,后者用于处理 State。

对标 LiveData 的是 StateFlow ,Flow 本身定位是类似 RxJava 是用于响应式编程的 API,既然 StateFlow 能做 LiveData 的活,并且功能更强大、可以简化数据处理,用它何乐而不为呢。


END

推荐文章 


 

欢迎进群



您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存